package com.example.spider.cluster.spider;

import com.example.spider.cluster.common.dto.StandMqMessage;
import com.example.spider.cluster.common.dto.SpiderResultDTO;
import com.example.spider.cluster.common.stream.SpiderChannel;
import com.example.spider.cluster.common.stream.SpiderQueueDefinition;
import com.example.spider.common.config.SpiderResultHandlerAutoConfiguration;
import com.example.spider.common.spider.Spider;
import com.example.spider.common.task.SpiderTask;
import com.example.spider.cluster.common.task.SpiderTaskTypeManager;
import com.example.spider.common.util.JsonUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author lym
 */
@Slf4j
@Service
@EnableBinding(SpiderChannel.class)
public class SpiderManager {

    @Autowired
    @Qualifier(SpiderResultHandlerAutoConfiguration.HANDLER)
    private ThreadPoolExecutor executor;

    @Autowired
    private SpiderChannel spiderChannel;

    @Autowired
    private Spider spider;

    @StreamListener(SpiderQueueDefinition.SPIDER_TASK_IN)
    public void input(Message<StandMqMessage> spiderTaskMsg, @Header(AmqpHeaders.CHANNEL) Channel channel,
                      @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
        try {
            log.debug("received SpiderTask");
            executor.execute(() -> {
                StandMqMessage standMqMessage = spiderTaskMsg.getPayload();
                SpiderTask task = SpiderTaskTypeManager.toSpiderTask(standMqMessage.getData(), standMqMessage.getType());
                String result = spider.doSpider(task);
                sendResult(task, result);
            });
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("error when received SpiderTask", e);
        }
    }

    public <T extends SpiderTask> void sendResult(T spiderTask, String result) {
        SpiderResultDTO<T> resultDTO = new SpiderResultDTO<>();
        resultDTO.setSpiderTask(spiderTask);
        resultDTO.setResult(result);
        spiderChannel.putesult().send(MessageBuilder.withPayload(resultDTO.toStandMqMessage()).build());
    }

}
